Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support On-Demand Repartition #14411

Open
wants to merge 25 commits into
base: main
Choose a base branch
from

Conversation

Weijun-H
Copy link
Member

@Weijun-H Weijun-H commented Feb 2, 2025

Which issue does this PR close?

Closes #14287

Rationale for this change

  • Introduce prefer_round_robin_repartititon in optimizer config, when it is false, replace all RoundRobinBatch with OnDemandRepartition
  • Use mpmc channel to make sure the Repartition poll one data when requesting instead of pre-assigning
  • Use the Tokio channel when using OnDemandRepartitionExec instead of the customized Distributed Channel
/// The OnDemandRepartitionExec operator repartitions the input data based on a push-based model.
/// It is similar to the RepartitionExec operator, but it doesn't distribute the data to the output
/// partitions until the output partitions request the data.
///
/// When polling, the operator sends the output partition number to the one partition channel, then the prefetch buffer will distribute the data based on the order of the partition number.
/// Each input steams has a prefetch buffer(channel) to distribute the data to the output partitions.
///
/// The following diagram illustrates the data flow of the OnDemandRepartitionExec operator with 3 output partitions for the input stream 1:
/// ```text
///         /\                     /\                     /\
///         ││                     ││                     ││
///         ││                     ││                     ││
///         ││                     ││                     ││
/// ┌───────┴┴────────┐    ┌───────┴┴────────┐    ┌───────┴┴────────┐
/// │     Stream      │    │     Stream      │    │     Stream      │
/// │       (1)       │    │       (2)       │    │       (3)       │
/// └────────┬────────┘    └───────┬─────────┘    └────────┬────────┘
///          │                     │                       │    / \
///          │                     │                       │    | |
///          │                     │                       │    | |
///          └────────────────┐    │    ┌──────────────────┘    | |
///                           │    │    │                       | |
///                           ▼    ▼    ▼                       | |
///                       ┌─────────────────┐                   | |
///  Send the partition   │ partion channel │                   | |
///  number when polling  │                 │                   | |
///                       └────────┬────────┘                   | |
///                                │                            | |
///                                │                            | |
///                                │  Get the partition number  | |
///                                ▼  then send data            | |
///                       ┌─────────────────┐                   | |
///                       │ Prefetch Buffer │───────────────────┘ |
///                       │       (1)       │─────────────────────┘
///                       └─────────────────┘ Distribute data to the output partitions
///
/// ```text

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

Benchmark

UPDATE:
I reviewed this PR again and verified that the optimizer's behavior remains consistent with RoundRobinBatch after incorporating OnDemandRepartitionExec. Based on the current results, performance is comparable for tpch and tpch_10. This PR also reduces excessive memory usage caused by prefetching.

--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃ on-demand-not-always-add-roundrobin ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │    1.38ms │                              1.44ms │     no change │
│ QQuery 1     │   22.23ms │                             21.71ms │     no change │
│ QQuery 2     │   62.34ms │                             64.39ms │     no change │
│ QQuery 3     │   52.80ms │                             52.36ms │     no change │
│ QQuery 4     │  486.81ms │                            486.49ms │     no change │
│ QQuery 5     │  552.51ms │                            539.88ms │     no change │
│ QQuery 6     │   21.95ms │                             21.57ms │     no change │
│ QQuery 7     │   26.13ms │                             25.52ms │     no change │
│ QQuery 8     │  534.38ms │                            532.79ms │     no change │
│ QQuery 9     │  747.43ms │                            766.04ms │     no change │
│ QQuery 10    │  164.56ms │                            170.90ms │     no change │
│ QQuery 11    │  186.45ms │                            184.72ms │     no change │
│ QQuery 12    │  566.88ms │                            578.79ms │     no change │
│ QQuery 13    │  874.66ms │                            802.30ms │ +1.09x faster │
│ QQuery 14    │  535.38ms │                            528.55ms │     no change │
│ QQuery 15    │  589.57ms │                            590.98ms │     no change │
│ QQuery 16    │ 1184.76ms │                           1162.10ms │     no change │
│ QQuery 17    │ 1327.31ms │                           1107.68ms │ +1.20x faster │
│ QQuery 18    │ 4283.66ms │                           3589.80ms │ +1.19x faster │
│ QQuery 19    │   48.24ms │                             48.46ms │     no change │
│ QQuery 20    │  836.82ms │                            817.40ms │     no change │
│ QQuery 21    │ 1047.08ms │                           1044.21ms │     no change │
│ QQuery 22    │ 2191.31ms │                           2261.84ms │     no change │
│ QQuery 23    │ 6323.93ms │                           6412.42ms │     no change │
│ QQuery 24    │  319.52ms │                            329.45ms │     no change │
│ QQuery 25    │  271.98ms │                            284.84ms │     no change │
│ QQuery 26    │  362.57ms │                            359.23ms │     no change │
│ QQuery 27    │ 1160.97ms │                           1180.38ms │     no change │
│ QQuery 28    │ 9438.45ms │                           9427.73ms │     no change │
│ QQuery 29    │  456.50ms │                            466.33ms │     no change │
│ QQuery 30    │  597.09ms │                            585.79ms │     no change │
│ QQuery 31    │  600.90ms │                            587.15ms │     no change │
│ QQuery 32    │ 4586.09ms │                           4224.30ms │ +1.09x faster │
│ QQuery 33    │ 4954.57ms │                           4678.45ms │ +1.06x faster │
│ QQuery 34    │ 4676.92ms │                           4627.91ms │     no change │
│ QQuery 35    │  817.87ms │                            802.52ms │     no change │
│ QQuery 36    │  102.38ms │                            103.95ms │     no change │
│ QQuery 37    │   48.60ms │                             48.88ms │     no change │
│ QQuery 38    │   69.60ms │                             70.24ms │     no change │
│ QQuery 39    │  189.69ms │                            188.87ms │     no change │
│ QQuery 40    │   22.62ms │                             22.32ms │     no change │
│ QQuery 41    │   20.19ms │                             20.35ms │     no change │
│ QQuery 42    │   26.05ms │                             27.16ms │     no change │
└──────────────┴───────────┴─────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                  │ 51391.06ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 49848.22ms │
│ Average Time (main)                                │  1195.14ms │
│ Average Time (on-demand-not-always-add-roundrobin) │  1159.26ms │
│ Queries Faster                                     │          5 │
│ Queries Slower                                     │          0 │
│ Queries with No Change                             │         38 │
└────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ on-demand-not-always-add-roundrobin ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  72.78ms │                             71.16ms │     no change │
│ QQuery 2     │  12.99ms │                             13.07ms │     no change │
│ QQuery 3     │  22.70ms │                             22.43ms │     no change │
│ QQuery 4     │  10.85ms │                             11.08ms │     no change │
│ QQuery 5     │  35.37ms │                             34.96ms │     no change │
│ QQuery 6     │   4.17ms │                              4.25ms │     no change │
│ QQuery 7     │  67.77ms │                             66.79ms │     no change │
│ QQuery 8     │  16.21ms │                             15.53ms │     no change │
│ QQuery 9     │  39.59ms │                             37.89ms │     no change │
│ QQuery 10    │  32.83ms │                             32.30ms │     no change │
│ QQuery 11    │   6.17ms │                              5.75ms │ +1.07x faster │
│ QQuery 12    │  20.63ms │                             20.74ms │     no change │
│ QQuery 13    │  16.78ms │                             16.67ms │     no change │
│ QQuery 14    │   5.19ms │                              5.06ms │     no change │
│ QQuery 15    │  11.32ms │                             11.47ms │     no change │
│ QQuery 16    │  12.92ms │                             12.75ms │     no change │
│ QQuery 17    │  55.92ms │                             56.38ms │     no change │
│ QQuery 18    │ 119.63ms │                            121.32ms │     no change │
│ QQuery 19    │  24.76ms │                             24.22ms │     no change │
│ QQuery 20    │  19.84ms │                             20.35ms │     no change │
│ QQuery 21    │  83.18ms │                             84.99ms │     no change │
│ QQuery 22    │  19.67ms │                             18.49ms │ +1.06x faster │
└──────────────┴──────────┴─────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃          ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Total Time (main)                                  │ 711.28ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 707.64ms │
│ Average Time (main)                                │  32.33ms │
│ Average Time (on-demand-not-always-add-roundrobin) │  32.17ms │
│ Queries Faster                                     │        2 │
│ Queries Slower                                     │        0 │
│ Queries with No Change                             │       20 │
└────────────────────────────────────────────────────┴──────────┘
--------------------
Benchmark tpch_mem_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃ on-demand-not-always-add-roundrobin ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 1402.09ms │                           1342.40ms │     no change │
│ QQuery 2     │  142.49ms │                            131.85ms │ +1.08x faster │
│ QQuery 3     │  233.84ms │                            286.30ms │  1.22x slower │
│ QQuery 4     │  122.64ms │                            121.37ms │     no change │
│ QQuery 5     │  467.75ms │                            661.07ms │  1.41x slower │
│ QQuery 6     │  119.11ms │                            317.05ms │  2.66x slower │
│ QQuery 7     │ 1367.30ms │                           1556.33ms │  1.14x slower │
│ QQuery 8     │  312.72ms │                            743.68ms │  2.38x slower │
│ QQuery 9     │ 1094.74ms │                           1581.38ms │  1.44x slower │
│ QQuery 10    │  577.96ms │                            875.53ms │  1.51x slower │
│ QQuery 11    │  117.18ms │                            115.31ms │     no change │
│ QQuery 12    │  623.06ms │                            665.48ms │  1.07x slower │
│ QQuery 13    │  329.84ms │                            350.92ms │  1.06x slower │
│ QQuery 14    │  166.44ms │                             49.00ms │ +3.40x faster │
│ QQuery 15    │  124.20ms │                            159.39ms │  1.28x slower │
│ QQuery 16    │  111.84ms │                            104.72ms │ +1.07x faster │
│ QQuery 17    │  872.77ms │                            842.42ms │     no change │
│ QQuery 18    │ 4544.51ms │                           4226.43ms │ +1.08x faster │
│ QQuery 19    │  233.44ms │                            825.55ms │  3.54x slower │
│ QQuery 20    │  352.69ms │                            353.86ms │     no change │
│ QQuery 21    │ 2025.51ms │                           1949.02ms │     no change │
│ QQuery 22    │   99.36ms │                             98.60ms │     no change │
└──────────────┴───────────┴─────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                  │ 15441.45ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 17357.64ms │
│ Average Time (main)                                │   701.88ms │
│ Average Time (on-demand-not-always-add-roundrobin) │   788.98ms │
│ Queries Faster                                     │          4 │
│ Queries Slower                                     │         11 │
│ Queries with No Change                             │          7 │
└────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ on-demand-not-always-add-roundrobin ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │ 101.98ms │                             95.95ms │ +1.06x faster │
│ QQuery 2     │  20.42ms │                             19.91ms │     no change │
│ QQuery 3     │  37.85ms │                             36.34ms │     no change │
│ QQuery 4     │  24.06ms │                             22.75ms │ +1.06x faster │
│ QQuery 5     │  55.01ms │                             54.61ms │     no change │
│ QQuery 6     │  18.25ms │                             17.84ms │     no change │
│ QQuery 7     │  75.71ms │                             74.00ms │     no change │
│ QQuery 8     │  47.84ms │                             49.70ms │     no change │
│ QQuery 9     │  70.18ms │                             66.34ms │ +1.06x faster │
│ QQuery 10    │  58.39ms │                             59.63ms │     no change │
│ QQuery 11    │  13.99ms │                             14.86ms │  1.06x slower │
│ QQuery 12    │  35.81ms │                             33.38ms │ +1.07x faster │
│ QQuery 13    │  33.49ms │                             32.24ms │     no change │
│ QQuery 14    │  29.12ms │                             29.82ms │     no change │
│ QQuery 15    │  42.91ms │                             42.21ms │     no change │
│ QQuery 16    │  14.83ms │                             14.64ms │     no change │
│ QQuery 17    │  91.42ms │                             95.82ms │     no change │
│ QQuery 18    │ 116.25ms │                            117.78ms │     no change │
│ QQuery 19    │  45.73ms │                             48.17ms │  1.05x slower │
│ QQuery 20    │  40.10ms │                             39.97ms │     no change │
│ QQuery 21    │  92.63ms │                             93.35ms │     no change │
│ QQuery 22    │  15.76ms │                             16.01ms │     no change │
└──────────────┴──────────┴─────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main)                                  │ 1081.72ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 1075.33ms │
│ Average Time (main)                                │   49.17ms │
│ Average Time (on-demand-not-always-add-roundrobin) │   48.88ms │
│ Queries Faster                                     │         4 │
│ Queries Slower                                     │         2 │
│ Queries with No Change                             │        16 │
└────────────────────────────────────────────────────┴───────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃      main ┃ on-demand-not-always-add-roundrobin ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │  973.40ms │                            943.27ms │    no change │
│ QQuery 2     │  142.00ms │                            136.28ms │    no change │
│ QQuery 3     │  453.24ms │                            446.22ms │    no change │
│ QQuery 4     │  222.49ms │                            231.51ms │    no change │
│ QQuery 5     │  679.49ms │                            671.58ms │    no change │
│ QQuery 6     │  158.87ms │                            158.40ms │    no change │
│ QQuery 7     │  975.28ms │                            967.93ms │    no change │
│ QQuery 8     │  698.05ms │                            698.72ms │    no change │
│ QQuery 9     │  993.03ms │                           1097.19ms │ 1.10x slower │
│ QQuery 10    │  557.65ms │                            559.98ms │    no change │
│ QQuery 11    │   85.04ms │                             88.46ms │    no change │
│ QQuery 12    │  291.15ms │                            282.05ms │    no change │
│ QQuery 13    │  427.02ms │                            421.93ms │    no change │
│ QQuery 14    │  230.86ms │                            231.48ms │    no change │
│ QQuery 15    │  380.34ms │                            412.02ms │ 1.08x slower │
│ QQuery 16    │   96.16ms │                             96.86ms │    no change │
│ QQuery 17    │ 1080.50ms │                           1088.24ms │    no change │
│ QQuery 18    │ 1587.88ms │                           1587.75ms │    no change │
│ QQuery 19    │  397.81ms │                            395.33ms │    no change │
│ QQuery 20    │  379.61ms │                            378.36ms │    no change │
│ QQuery 21    │ 1365.67ms │                           1344.72ms │    no change │
│ QQuery 22    │  133.86ms │                            130.80ms │    no change │
└──────────────┴───────────┴─────────────────────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (main)                                  │ 12309.39ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 12369.07ms │
│ Average Time (main)                                │   559.52ms │
│ Average Time (on-demand-not-always-add-roundrobin) │   562.23ms │
│ Queries Faster                                     │          0 │
│ Queries Slower                                     │          2 │
│ Queries with No Change                             │         20 │
└────────────────────────────────────────────────────┴────────────┘

@github-actions github-actions bot added physical-expr Physical Expressions optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) common Related to common crate proto Related to proto crate labels Feb 2, 2025
@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch from 54db067 to 6ffe62c Compare February 2, 2025 16:23
@github-actions github-actions bot added the documentation Improvements or additions to documentation label Feb 2, 2025
@ozankabak
Copy link
Contributor

@Weijun-H has been working on this with the Synnada team for a while. The initial benchmark results were promising, so we decided to continue development while receiving community feedback 🚀

@ozankabak
Copy link
Contributor

This is still in somewhat early stages, and there is work to do. But it might be good to get feedback early on from the community as the performance of this code is somewhat sensitive to idioms used with channels etc.

Copy link
Contributor

@mertak-synnada mertak-synnada left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the work! Just put some comments

@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch 3 times, most recently from 69a3c4f to f6934d1 Compare February 6, 2025 14:42
@Weijun-H Weijun-H marked this pull request as ready for review February 6, 2025 15:20
@alamb
Copy link
Contributor

alamb commented Feb 6, 2025

This is still in somewhat early stages, and there is work to do. But it might be good to get feedback early on from the community as the performance of this code is somewhat sensitive to idioms used with channels etc.

Maybe I am missing something, but the benchmark numbers reported above don't really show much of an improvement

For example, this branch appears to be basically the same

│ Total Time (main)                                │ 11767.43ms │
│ Total Time (on-demand-repartition-with-config)   │ 11787.12ms │

Are there any benchmarks that show a performance benefit of all this new code?

@ozankabak
Copy link
Contributor

@Weijun-H did some benchmarks a while back and the approach seemed promising in TPCH/SF50.

@mertak-synnada will do a detailed review of this tomorrow and then @Weijun-H can run the latest benchmarks for us to see how the performance changes

@berkaysynnada
Copy link
Contributor

Maybe I am missing something, but the benchmark numbers reported above don't really show much of an improvement

this might be a silly question but, did you set the config flag for on-demand-repartition-with-config branch?😅

@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch from fa91ea3 to beacced Compare February 7, 2025 04:18
@Weijun-H
Copy link
Member Author

Weijun-H commented Feb 7, 2025

I updated the latest benchmark results. It appears that the OnDemandRepartition improved performance on clickbench_partitioned and large datasets such as tpch_sf50. For tpch_sf1 and tpch_sf10, the results are similar. I will check again to ensure everything is functioning correctly in the coming days.
@ozankabak @alamb @berkaysynnada @mertak-synnada

UPDATE:
I reviewed this PR again and verified that the optimizer's behavior remains consistent with RoundRobinBatch after incorporating OnDemandRepartitionExec. Based on the current results, performance is comparable for tpch and tpch_10. Additionally, some queries in clickbench_partitioned benefit from this change. This PR also reduces excessive memory usage caused by prefetching.

@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch 2 times, most recently from 2ac6849 to df119c3 Compare February 7, 2025 05:14
@Weijun-H Weijun-H marked this pull request as draft February 7, 2025 07:22
@github-actions github-actions bot added core Core DataFusion crate and removed core Core DataFusion crate labels Feb 7, 2025
@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch from 794951b to 8b71674 Compare February 8, 2025 14:56
@Weijun-H Weijun-H marked this pull request as ready for review February 8, 2025 15:06
@2010YOUY01
Copy link
Contributor

Impressive work! I got a suggestion and a high-level question:

Suggestion

I think to justify this change, we have to make sure:

  • No performance regression (benchmarks already showed)
  • Reduce memory footprint, for queries which batch can accumulate in RepartitionExec (as the origin issue said)

I tried to check the memory usage for tpch-sf10 and clickbench, there is no noticeable change for those queries.
Perhaps we should construct queries with this anti-pattern, and demonstrate memory usage can actually reduced by this on-demand repartition executor?

Here is a script for checking memory usage in benchmark queries

# This script should be placed under benchmarks/
#
# Supported benchmarks are 'tpch' and 'clickbench'
#
# Example usage:
# Run TPCH benchmark and save results:
#   python3 membench.py run --benchmark tpch --result tpch_main.csv
#   python3 membench.py run --benchmark tpch --result tpch_optimized.csv
#
# Compare results:
#   python3 membench.py compare tpch_main.csv tpch_optimized.csv

import subprocess
import re
import csv
import argparse

def human_readable_size(size):
    units = ["B", "K", "M", "G", "T"]
    index = 0
    while size >= 1024 and index < len(units) - 1:
        size /= 1024.0
        index += 1
    return f"{size:.2f}{units[index]}"

def run_tpch_queries(label, result_file):
    results = []
    for query in range(1, 23):
        cmd = [
            "/usr/bin/time", "-l", "cargo", "run", "--release", "--bin", "dfbench", 
            "tpch", "--format", "parquet", "--path", "./data/tpch_sf10", 
            "--query", str(query), "--iterations", "1"
        ]
        
        process = subprocess.run(cmd, capture_output=True, text=True, shell=False)
        stderr_output = process.stderr
        
        match = re.search(r"(\d+)\s+maximum resident set size", stderr_output)
        max_rss = human_readable_size(int(match.group(1))) if match else "N/A"
        results.append((query, max_rss))
    
    with open(result_file, "w", newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["Query", "Memory"])
        writer.writerows(results)
    
    print(f"Results saved to {result_file}")

def run_clickbench_queries(label, result_file):
    results = []
    for query in range(0, 43):
        cmd = [
            "/usr/bin/time", "-l", "cargo", "run", "--release", "--bin", "dfbench", 
            "clickbench", "--path", "./data/hits.parquet", 
            "--queries-path", "./queries/clickbench/queries.sql", 
            "--query", str(query), "--iterations", "1"
        ]
        
        process = subprocess.run(cmd, capture_output=True, text=True, shell=False)
        stderr_output = process.stderr
        
        match = re.search(r"(\d+)\s+maximum resident set size", stderr_output)
        max_rss = human_readable_size(int(match.group(1))) if match else "N/A"
        results.append((query, max_rss))
    
    with open(result_file, "w", newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["Query", "Memory"])
        writer.writerows(results)
    
    print(f"Results saved to {result_file}")

def compare_results(file1, file2):
    results1, results2 = {}, {}
    
    with open(file1, "r") as f1, open(file2, "r") as f2:
        reader1, reader2 = csv.reader(f1), csv.reader(f2)
        next(reader1)  # Skip header
        next(reader2)  # Skip header
        
        for row in reader1:
            results1[row[0]] = row[1]
        for row in reader2:
            results2[row[0]] = row[1]
    
    print(f"{'Query':<10}{'Branch1':<10}{'Branch2':<10}{'Change'}")
    for query in results1:
        mem1 = results1[query]
        mem2 = results2.get(query, "N/A")
        
        if mem1 != "N/A" and mem2 != "N/A":
            size1 = float(mem1[:-1])
            size2 = float(mem2[:-1])
            ratio = size2 / size1 if size1 > 0 else 1.0
            change = f"{ratio:.2f}X" if abs(ratio - 1) > 0.05 else "No Change"
        else:
            change = "N/A"
        
        print(f"{query:<10}{mem1:<10}{mem2:<10}{change}")

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("mode", choices=["run", "compare"], help="Run benchmarks or compare results")
    parser.add_argument("--result", help="Output result file for benchmarking")
    parser.add_argument("--benchmark", choices=["tpch", "clickbench"], help="Specify which benchmark to run")
    parser.add_argument("file1", nargs="?", help="First result file for comparison")
    parser.add_argument("file2", nargs="?", help="Second result file for comparison")
    args = parser.parse_args()
    
    if args.mode == "run" and args.result and args.benchmark:
        if args.benchmark == "tpch":
            run_tpch_queries("run", args.result)
        elif args.benchmark == "clickbench":
            run_clickbench_queries("run", args.result)
    elif args.mode == "compare" and args.file1 and args.file2:
        compare_results(args.file1, args.file2)
    else:
        print("Invalid arguments. Use --help for usage information.")

if __name__ == "__main__":
    main()

Results:

TPCH:
----
Query     Branch1   Branch2   Change
1         464.05M   460.78M   No Change
2         397.00M   412.77M   No Change
3         714.56M   630.64M   0.88X
4         408.53M   418.78M   No Change
5         741.30M   769.73M   No Change
6         390.02M   398.72M   No Change
7         3.41G     3.45G     No Change
8         1.08G     1.05G     No Change
9         2.37G     2.31G     No Change
10        1.11G     1.16G     No Change
11        260.78M   267.41M   No Change
12        429.95M   449.06M   No Change
13        675.67M   668.22M   No Change
14        666.56M   700.22M   No Change
15        673.66M   656.70M   No Change
16        485.81M   474.59M   No Change
17        605.38M   631.92M   No Change
18        3.26G     3.29G     No Change
19        500.77M   577.95M   1.15X
20        1.07G     1.05G     No Change
21        982.59M   978.69M   No Change
22        303.86M   302.14M   No Change

Clickbench:
...(no change)

Question

In my understanding the new repartition executor is a wrapper on RepartitionExec, to enable lazy evaluation, it should support both RoundRobin and Hash repartition right? This PR only swapped RoundRobin, do you also plan to add on-demand hash repartition in the future?

@Dandandan
Copy link
Contributor

Dandandan commented Feb 14, 2025

I ran some tests yesterday and I can confirm the runtime improvements.
I do get some high memory usage however especially with some queries (TPC-H Query 18 I believe) than when using round-robin repartitioning. Are there some ways to get it down (e.g. use bounded channels or otherwise?)

@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch from 846afdd to ad72077 Compare February 15, 2025 06:13
@Weijun-H
Copy link
Member Author

Weijun-H commented Feb 15, 2025

I ran some tests yesterday and I can confirm the runtime improvements. I do get some high memory usage however especially with some queries (TPC-H Query 18 I believe) than when using round-robin repartitioning. Are there some ways to get it down (e.g. use bounded channels or otherwise?)

I tried to avoid using yield_now when waiting for the child operator data; this should lower memory usage.
benchmark after adopting this approach

The performance decreased in many cases @Dandandan

--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ on-demand-repartition-with-config ┃ on-demand-not-always-add-roundrobin ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │                            1.36ms │                              1.44ms │  1.05x slower │
│ QQuery 1     │                           30.28ms │                             21.71ms │ +1.39x faster │
│ QQuery 2     │                           73.36ms │                             64.39ms │ +1.14x faster │
│ QQuery 3     │                           64.40ms │                             52.36ms │ +1.23x faster │
│ QQuery 4     │                          512.46ms │                            486.49ms │ +1.05x faster │
│ QQuery 5     │                          571.75ms │                            539.88ms │ +1.06x faster │
│ QQuery 6     │                           31.22ms │                             21.57ms │ +1.45x faster │
│ QQuery 7     │                           33.53ms │                             25.52ms │ +1.31x faster │
│ QQuery 8     │                          568.44ms │                            532.79ms │ +1.07x faster │
│ QQuery 9     │                          758.40ms │                            766.04ms │     no change │
│ QQuery 10    │                          180.18ms │                            170.90ms │ +1.05x faster │
│ QQuery 11    │                          202.26ms │                            184.72ms │ +1.09x faster │
│ QQuery 12    │                          601.67ms │                            578.79ms │     no change │
│ QQuery 13    │                          832.37ms │                            802.30ms │     no change │
│ QQuery 14    │                          581.32ms │                            528.55ms │ +1.10x faster │
│ QQuery 15    │                          625.30ms │                            590.98ms │ +1.06x faster │
│ QQuery 16    │                         1362.52ms │                           1162.10ms │ +1.17x faster │
│ QQuery 17    │                         1258.20ms │                           1107.68ms │ +1.14x faster │
│ QQuery 18    │                         3628.57ms │                           3589.80ms │     no change │
│ QQuery 19    │                           59.32ms │                             48.46ms │ +1.22x faster │
│ QQuery 20    │                          884.00ms │                            817.40ms │ +1.08x faster │
│ QQuery 21    │                         1066.61ms │                           1044.21ms │     no change │
│ QQuery 22    │                         1883.62ms │                           2261.84ms │  1.20x slower │
│ QQuery 23    │                         6562.12ms │                           6412.42ms │     no change │
│ QQuery 24    │                          337.47ms │                            329.45ms │     no change │
│ QQuery 25    │                          263.68ms │                            284.84ms │  1.08x slower │
│ QQuery 26    │                          366.64ms │                            359.23ms │     no change │
│ QQuery 27    │                         1185.09ms │                           1180.38ms │     no change │
│ QQuery 28    │                         8860.10ms │                           9427.73ms │  1.06x slower │
│ QQuery 29    │                          441.28ms │                            466.33ms │  1.06x slower │
│ QQuery 30    │                          625.09ms │                            585.79ms │ +1.07x faster │
│ QQuery 31    │                          552.68ms │                            587.15ms │  1.06x slower │
│ QQuery 32    │                         4358.41ms │                           4224.30ms │     no change │
│ QQuery 33    │                         6845.44ms │                           4678.45ms │ +1.46x faster │
│ QQuery 34    │                         7866.05ms │                           4627.91ms │ +1.70x faster │
│ QQuery 35    │                          841.92ms │                            802.52ms │     no change │
│ QQuery 36    │                           80.70ms │                            103.95ms │  1.29x slower │
│ QQuery 37    │                           33.40ms │                             48.88ms │  1.46x slower │
│ QQuery 38    │                           68.14ms │                             70.24ms │     no change │
│ QQuery 39    │                          144.39ms │                            188.87ms │  1.31x slower │
│ QQuery 40    │                           20.52ms │                             22.32ms │  1.09x slower │
│ QQuery 41    │                           20.79ms │                             20.35ms │     no change │
│ QQuery 42    │                           17.83ms │                             27.16ms │  1.52x slower │
└──────────────┴───────────────────────────────────┴─────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-with-config)     │ 55302.89ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 49848.22ms │
│ Average Time (on-demand-repartition-with-config)   │  1286.11ms │
│ Average Time (on-demand-not-always-add-roundrobin) │  1159.26ms │
│ Queries Faster                                     │         19 │
│ Queries Slower                                     │         11 │
│ Queries with No Change                             │         13 │
└────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ on-demand-repartition-with-config ┃ on-demand-not-always-add-roundrobin ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │                           85.39ms │                             71.16ms │ +1.20x faster │
│ QQuery 2     │                           14.22ms │                             13.07ms │ +1.09x faster │
│ QQuery 3     │                           24.57ms │                             22.43ms │ +1.10x faster │
│ QQuery 4     │                           15.57ms │                             11.08ms │ +1.40x faster │
│ QQuery 5     │                           41.50ms │                             34.96ms │ +1.19x faster │
│ QQuery 6     │                            4.82ms │                              4.25ms │ +1.14x faster │
│ QQuery 7     │                           74.88ms │                             66.79ms │ +1.12x faster │
│ QQuery 8     │                           17.29ms │                             15.53ms │ +1.11x faster │
│ QQuery 9     │                           41.62ms │                             37.89ms │ +1.10x faster │
│ QQuery 10    │                           35.49ms │                             32.30ms │ +1.10x faster │
│ QQuery 11    │                            6.85ms │                              5.75ms │ +1.19x faster │
│ QQuery 12    │                           24.91ms │                             20.74ms │ +1.20x faster │
│ QQuery 13    │                           17.21ms │                             16.67ms │     no change │
│ QQuery 14    │                            5.16ms │                              5.06ms │     no change │
│ QQuery 15    │                           12.00ms │                             11.47ms │     no change │
│ QQuery 16    │                           12.75ms │                             12.75ms │     no change │
│ QQuery 17    │                           57.71ms │                             56.38ms │     no change │
│ QQuery 18    │                          124.63ms │                            121.32ms │     no change │
│ QQuery 19    │                           24.50ms │                             24.22ms │     no change │
│ QQuery 20    │                           20.86ms │                             20.35ms │     no change │
│ QQuery 21    │                           86.02ms │                             84.99ms │     no change │
│ QQuery 22    │                           18.71ms │                             18.49ms │     no change │
└──────────────┴───────────────────────────────────┴─────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃          ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Total Time (on-demand-repartition-with-config)     │ 766.66ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 707.64ms │
│ Average Time (on-demand-repartition-with-config)   │  34.85ms │
│ Average Time (on-demand-not-always-add-roundrobin) │  32.17ms │
│ Queries Faster                                     │       12 │
│ Queries Slower                                     │        0 │
│ Queries with No Change                             │       10 │
└────────────────────────────────────────────────────┴──────────┘
--------------------
Benchmark tpch_mem_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┓
┃ Query        ┃ on-demand-repartition-with-config ┃ on-demand-not-always-add-roundrobin ┃         Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━┩
│ QQuery 1     │                         1335.99ms │                           1342.40ms │      no change │
│ QQuery 2     │                          121.22ms │                            131.85ms │   1.09x slower │
│ QQuery 3     │                          250.98ms │                            286.30ms │   1.14x slower │
│ QQuery 4     │                          125.97ms │                            121.37ms │      no change │
│ QQuery 5     │                          766.43ms │                            661.07ms │  +1.16x faster │
│ QQuery 6     │                          474.20ms │                            317.05ms │  +1.50x faster │
│ QQuery 7     │                         1190.87ms │                           1556.33ms │   1.31x slower │
│ QQuery 8     │                          584.23ms │                            743.68ms │   1.27x slower │
│ QQuery 9     │                         1150.48ms │                           1581.38ms │   1.37x slower │
│ QQuery 10    │                          898.01ms │                            875.53ms │      no change │
│ QQuery 11    │                          113.16ms │                            115.31ms │      no change │
│ QQuery 12    │                          321.89ms │                            665.48ms │   2.07x slower │
│ QQuery 13    │                          340.09ms │                            350.92ms │      no change │
│ QQuery 14    │                          566.53ms │                             49.00ms │ +11.56x faster │
│ QQuery 15    │                          123.92ms │                            159.39ms │   1.29x slower │
│ QQuery 16    │                           93.89ms │                            104.72ms │   1.12x slower │
│ QQuery 17    │                          912.10ms │                            842.42ms │  +1.08x faster │
│ QQuery 18    │                         4680.46ms │                           4226.43ms │  +1.11x faster │
│ QQuery 19    │                          857.78ms │                            825.55ms │      no change │
│ QQuery 20    │                          241.99ms │                            353.86ms │   1.46x slower │
│ QQuery 21    │                         1915.38ms │                           1949.02ms │      no change │
│ QQuery 22    │                           90.42ms │                             98.60ms │   1.09x slower │
└──────────────┴───────────────────────────────────┴─────────────────────────────────────┴────────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-with-config)     │ 17155.98ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 17357.64ms │
│ Average Time (on-demand-repartition-with-config)   │   779.82ms │
│ Average Time (on-demand-not-always-add-roundrobin) │   788.98ms │
│ Queries Faster                                     │          5 │
│ Queries Slower                                     │         10 │
│ Queries with No Change                             │          7 │
└────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ on-demand-repartition-with-config ┃ on-demand-not-always-add-roundrobin ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │                          106.51ms │                             95.95ms │ +1.11x faster │
│ QQuery 2     │                           20.61ms │                             19.91ms │     no change │
│ QQuery 3     │                           43.59ms │                             36.34ms │ +1.20x faster │
│ QQuery 4     │                           26.63ms │                             22.75ms │ +1.17x faster │
│ QQuery 5     │                           66.74ms │                             54.61ms │ +1.22x faster │
│ QQuery 6     │                           20.47ms │                             17.84ms │ +1.15x faster │
│ QQuery 7     │                           84.02ms │                             74.00ms │ +1.14x faster │
│ QQuery 8     │                           54.67ms │                             49.70ms │ +1.10x faster │
│ QQuery 9     │                           78.47ms │                             66.34ms │ +1.18x faster │
│ QQuery 10    │                           67.30ms │                             59.63ms │ +1.13x faster │
│ QQuery 11    │                           16.11ms │                             14.86ms │ +1.08x faster │
│ QQuery 12    │                           42.73ms │                             33.38ms │ +1.28x faster │
│ QQuery 13    │                           38.63ms │                             32.24ms │ +1.20x faster │
│ QQuery 14    │                           31.97ms │                             29.82ms │ +1.07x faster │
│ QQuery 15    │                           47.89ms │                             42.21ms │ +1.13x faster │
│ QQuery 16    │                           16.23ms │                             14.64ms │ +1.11x faster │
│ QQuery 17    │                          103.78ms │                             95.82ms │ +1.08x faster │
│ QQuery 18    │                          134.12ms │                            117.78ms │ +1.14x faster │
│ QQuery 19    │                           52.97ms │                             48.17ms │ +1.10x faster │
│ QQuery 20    │                           46.33ms │                             39.97ms │ +1.16x faster │
│ QQuery 21    │                          110.85ms │                             93.35ms │ +1.19x faster │
│ QQuery 22    │                           18.11ms │                             16.01ms │ +1.13x faster │
└──────────────┴───────────────────────────────────┴─────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-with-config)     │ 1228.72ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 1075.33ms │
│ Average Time (on-demand-repartition-with-config)   │   55.85ms │
│ Average Time (on-demand-not-always-add-roundrobin) │   48.88ms │
│ Queries Faster                                     │        21 │
│ Queries Slower                                     │         0 │
│ Queries with No Change                             │         1 │
└────────────────────────────────────────────────────┴───────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ on-demand-repartition-with-config ┃ on-demand-not-always-add-roundrobin ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │                          843.22ms │                            943.27ms │  1.12x slower │
│ QQuery 2     │                          123.24ms │                            136.28ms │  1.11x slower │
│ QQuery 3     │                          407.30ms │                            446.22ms │  1.10x slower │
│ QQuery 4     │                          198.17ms │                            231.51ms │  1.17x slower │
│ QQuery 5     │                          604.48ms │                            671.58ms │  1.11x slower │
│ QQuery 6     │                          136.38ms │                            158.40ms │  1.16x slower │
│ QQuery 7     │                          887.74ms │                            967.93ms │  1.09x slower │
│ QQuery 8     │                          628.50ms │                            698.72ms │  1.11x slower │
│ QQuery 9     │                         1009.86ms │                           1097.19ms │  1.09x slower │
│ QQuery 10    │                          570.20ms │                            559.98ms │     no change │
│ QQuery 11    │                           90.48ms │                             88.46ms │     no change │
│ QQuery 12    │                          299.50ms │                            282.05ms │ +1.06x faster │
│ QQuery 13    │                          421.75ms │                            421.93ms │     no change │
│ QQuery 14    │                          231.66ms │                            231.48ms │     no change │
│ QQuery 15    │                          384.69ms │                            412.02ms │  1.07x slower │
│ QQuery 16    │                           96.85ms │                             96.86ms │     no change │
│ QQuery 17    │                         1088.84ms │                           1088.24ms │     no change │
│ QQuery 18    │                         1874.24ms │                           1587.75ms │ +1.18x faster │
│ QQuery 19    │                          462.80ms │                            395.33ms │ +1.17x faster │
│ QQuery 20    │                          429.06ms │                            378.36ms │ +1.13x faster │
│ QQuery 21    │                         1564.34ms │                           1344.72ms │ +1.16x faster │
│ QQuery 22    │                          144.52ms │                            130.80ms │ +1.10x faster │
└──────────────┴───────────────────────────────────┴─────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-with-config)     │ 12497.84ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 12369.07ms │
│ Average Time (on-demand-repartition-with-config)   │   568.08ms │
│ Average Time (on-demand-not-always-add-roundrobin) │   562.23ms │
│ Queries Faster                                     │          6 │
│ Queries Slower                                     │         10 │
│ Queries with No Change                             │          6 │
└────────────────────────────────────────────────────┴────────────┘

@Dandandan
Copy link
Contributor

I ran some tests yesterday and I can confirm the runtime improvements. I do get some high memory usage however especially with some queries (TPC-H Query 18 I believe) than when using round-robin repartitioning. Are there some ways to get it down (e.g. use bounded channels or otherwise?)

I tried to avoid using yield_now when waiting for the child operator data; this should lower memory usage. benchmark after adopting this approach

The performance decreased in many cases @Dandandan

--------------------
Benchmark clickbench_partitioned.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ on-demand-repartition-with-config ┃ on-demand-not-always-add-roundrobin ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │                            1.36ms │                              1.44ms │  1.05x slower │
│ QQuery 1     │                           30.28ms │                             21.71ms │ +1.39x faster │
│ QQuery 2     │                           73.36ms │                             64.39ms │ +1.14x faster │
│ QQuery 3     │                           64.40ms │                             52.36ms │ +1.23x faster │
│ QQuery 4     │                          512.46ms │                            486.49ms │ +1.05x faster │
│ QQuery 5     │                          571.75ms │                            539.88ms │ +1.06x faster │
│ QQuery 6     │                           31.22ms │                             21.57ms │ +1.45x faster │
│ QQuery 7     │                           33.53ms │                             25.52ms │ +1.31x faster │
│ QQuery 8     │                          568.44ms │                            532.79ms │ +1.07x faster │
│ QQuery 9     │                          758.40ms │                            766.04ms │     no change │
│ QQuery 10    │                          180.18ms │                            170.90ms │ +1.05x faster │
│ QQuery 11    │                          202.26ms │                            184.72ms │ +1.09x faster │
│ QQuery 12    │                          601.67ms │                            578.79ms │     no change │
│ QQuery 13    │                          832.37ms │                            802.30ms │     no change │
│ QQuery 14    │                          581.32ms │                            528.55ms │ +1.10x faster │
│ QQuery 15    │                          625.30ms │                            590.98ms │ +1.06x faster │
│ QQuery 16    │                         1362.52ms │                           1162.10ms │ +1.17x faster │
│ QQuery 17    │                         1258.20ms │                           1107.68ms │ +1.14x faster │
│ QQuery 18    │                         3628.57ms │                           3589.80ms │     no change │
│ QQuery 19    │                           59.32ms │                             48.46ms │ +1.22x faster │
│ QQuery 20    │                          884.00ms │                            817.40ms │ +1.08x faster │
│ QQuery 21    │                         1066.61ms │                           1044.21ms │     no change │
│ QQuery 22    │                         1883.62ms │                           2261.84ms │  1.20x slower │
│ QQuery 23    │                         6562.12ms │                           6412.42ms │     no change │
│ QQuery 24    │                          337.47ms │                            329.45ms │     no change │
│ QQuery 25    │                          263.68ms │                            284.84ms │  1.08x slower │
│ QQuery 26    │                          366.64ms │                            359.23ms │     no change │
│ QQuery 27    │                         1185.09ms │                           1180.38ms │     no change │
│ QQuery 28    │                         8860.10ms │                           9427.73ms │  1.06x slower │
│ QQuery 29    │                          441.28ms │                            466.33ms │  1.06x slower │
│ QQuery 30    │                          625.09ms │                            585.79ms │ +1.07x faster │
│ QQuery 31    │                          552.68ms │                            587.15ms │  1.06x slower │
│ QQuery 32    │                         4358.41ms │                           4224.30ms │     no change │
│ QQuery 33    │                         6845.44ms │                           4678.45ms │ +1.46x faster │
│ QQuery 34    │                         7866.05ms │                           4627.91ms │ +1.70x faster │
│ QQuery 35    │                          841.92ms │                            802.52ms │     no change │
│ QQuery 36    │                           80.70ms │                            103.95ms │  1.29x slower │
│ QQuery 37    │                           33.40ms │                             48.88ms │  1.46x slower │
│ QQuery 38    │                           68.14ms │                             70.24ms │     no change │
│ QQuery 39    │                          144.39ms │                            188.87ms │  1.31x slower │
│ QQuery 40    │                           20.52ms │                             22.32ms │  1.09x slower │
│ QQuery 41    │                           20.79ms │                             20.35ms │     no change │
│ QQuery 42    │                           17.83ms │                             27.16ms │  1.52x slower │
└──────────────┴───────────────────────────────────┴─────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-with-config)     │ 55302.89ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 49848.22ms │
│ Average Time (on-demand-repartition-with-config)   │  1286.11ms │
│ Average Time (on-demand-not-always-add-roundrobin) │  1159.26ms │
│ Queries Faster                                     │         19 │
│ Queries Slower                                     │         11 │
│ Queries with No Change                             │         13 │
└────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_mem_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ on-demand-repartition-with-config ┃ on-demand-not-always-add-roundrobin ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │                           85.39ms │                             71.16ms │ +1.20x faster │
│ QQuery 2     │                           14.22ms │                             13.07ms │ +1.09x faster │
│ QQuery 3     │                           24.57ms │                             22.43ms │ +1.10x faster │
│ QQuery 4     │                           15.57ms │                             11.08ms │ +1.40x faster │
│ QQuery 5     │                           41.50ms │                             34.96ms │ +1.19x faster │
│ QQuery 6     │                            4.82ms │                              4.25ms │ +1.14x faster │
│ QQuery 7     │                           74.88ms │                             66.79ms │ +1.12x faster │
│ QQuery 8     │                           17.29ms │                             15.53ms │ +1.11x faster │
│ QQuery 9     │                           41.62ms │                             37.89ms │ +1.10x faster │
│ QQuery 10    │                           35.49ms │                             32.30ms │ +1.10x faster │
│ QQuery 11    │                            6.85ms │                              5.75ms │ +1.19x faster │
│ QQuery 12    │                           24.91ms │                             20.74ms │ +1.20x faster │
│ QQuery 13    │                           17.21ms │                             16.67ms │     no change │
│ QQuery 14    │                            5.16ms │                              5.06ms │     no change │
│ QQuery 15    │                           12.00ms │                             11.47ms │     no change │
│ QQuery 16    │                           12.75ms │                             12.75ms │     no change │
│ QQuery 17    │                           57.71ms │                             56.38ms │     no change │
│ QQuery 18    │                          124.63ms │                            121.32ms │     no change │
│ QQuery 19    │                           24.50ms │                             24.22ms │     no change │
│ QQuery 20    │                           20.86ms │                             20.35ms │     no change │
│ QQuery 21    │                           86.02ms │                             84.99ms │     no change │
│ QQuery 22    │                           18.71ms │                             18.49ms │     no change │
└──────────────┴───────────────────────────────────┴─────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃          ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━┩
│ Total Time (on-demand-repartition-with-config)     │ 766.66ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 707.64ms │
│ Average Time (on-demand-repartition-with-config)   │  34.85ms │
│ Average Time (on-demand-not-always-add-roundrobin) │  32.17ms │
│ Queries Faster                                     │       12 │
│ Queries Slower                                     │        0 │
│ Queries with No Change                             │       10 │
└────────────────────────────────────────────────────┴──────────┘
--------------------
Benchmark tpch_mem_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┓
┃ Query        ┃ on-demand-repartition-with-config ┃ on-demand-not-always-add-roundrobin ┃         Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━┩
│ QQuery 1     │                         1335.99ms │                           1342.40ms │      no change │
│ QQuery 2     │                          121.22ms │                            131.85ms │   1.09x slower │
│ QQuery 3     │                          250.98ms │                            286.30ms │   1.14x slower │
│ QQuery 4     │                          125.97ms │                            121.37ms │      no change │
│ QQuery 5     │                          766.43ms │                            661.07ms │  +1.16x faster │
│ QQuery 6     │                          474.20ms │                            317.05ms │  +1.50x faster │
│ QQuery 7     │                         1190.87ms │                           1556.33ms │   1.31x slower │
│ QQuery 8     │                          584.23ms │                            743.68ms │   1.27x slower │
│ QQuery 9     │                         1150.48ms │                           1581.38ms │   1.37x slower │
│ QQuery 10    │                          898.01ms │                            875.53ms │      no change │
│ QQuery 11    │                          113.16ms │                            115.31ms │      no change │
│ QQuery 12    │                          321.89ms │                            665.48ms │   2.07x slower │
│ QQuery 13    │                          340.09ms │                            350.92ms │      no change │
│ QQuery 14    │                          566.53ms │                             49.00ms │ +11.56x faster │
│ QQuery 15    │                          123.92ms │                            159.39ms │   1.29x slower │
│ QQuery 16    │                           93.89ms │                            104.72ms │   1.12x slower │
│ QQuery 17    │                          912.10ms │                            842.42ms │  +1.08x faster │
│ QQuery 18    │                         4680.46ms │                           4226.43ms │  +1.11x faster │
│ QQuery 19    │                          857.78ms │                            825.55ms │      no change │
│ QQuery 20    │                          241.99ms │                            353.86ms │   1.46x slower │
│ QQuery 21    │                         1915.38ms │                           1949.02ms │      no change │
│ QQuery 22    │                           90.42ms │                             98.60ms │   1.09x slower │
└──────────────┴───────────────────────────────────┴─────────────────────────────────────┴────────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-with-config)     │ 17155.98ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 17357.64ms │
│ Average Time (on-demand-repartition-with-config)   │   779.82ms │
│ Average Time (on-demand-not-always-add-roundrobin) │   788.98ms │
│ Queries Faster                                     │          5 │
│ Queries Slower                                     │         10 │
│ Queries with No Change                             │          7 │
└────────────────────────────────────────────────────┴────────────┘
--------------------
Benchmark tpch_sf1.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ on-demand-repartition-with-config ┃ on-demand-not-always-add-roundrobin ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │                          106.51ms │                             95.95ms │ +1.11x faster │
│ QQuery 2     │                           20.61ms │                             19.91ms │     no change │
│ QQuery 3     │                           43.59ms │                             36.34ms │ +1.20x faster │
│ QQuery 4     │                           26.63ms │                             22.75ms │ +1.17x faster │
│ QQuery 5     │                           66.74ms │                             54.61ms │ +1.22x faster │
│ QQuery 6     │                           20.47ms │                             17.84ms │ +1.15x faster │
│ QQuery 7     │                           84.02ms │                             74.00ms │ +1.14x faster │
│ QQuery 8     │                           54.67ms │                             49.70ms │ +1.10x faster │
│ QQuery 9     │                           78.47ms │                             66.34ms │ +1.18x faster │
│ QQuery 10    │                           67.30ms │                             59.63ms │ +1.13x faster │
│ QQuery 11    │                           16.11ms │                             14.86ms │ +1.08x faster │
│ QQuery 12    │                           42.73ms │                             33.38ms │ +1.28x faster │
│ QQuery 13    │                           38.63ms │                             32.24ms │ +1.20x faster │
│ QQuery 14    │                           31.97ms │                             29.82ms │ +1.07x faster │
│ QQuery 15    │                           47.89ms │                             42.21ms │ +1.13x faster │
│ QQuery 16    │                           16.23ms │                             14.64ms │ +1.11x faster │
│ QQuery 17    │                          103.78ms │                             95.82ms │ +1.08x faster │
│ QQuery 18    │                          134.12ms │                            117.78ms │ +1.14x faster │
│ QQuery 19    │                           52.97ms │                             48.17ms │ +1.10x faster │
│ QQuery 20    │                           46.33ms │                             39.97ms │ +1.16x faster │
│ QQuery 21    │                          110.85ms │                             93.35ms │ +1.19x faster │
│ QQuery 22    │                           18.11ms │                             16.01ms │ +1.13x faster │
└──────────────┴───────────────────────────────────┴─────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-with-config)     │ 1228.72ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 1075.33ms │
│ Average Time (on-demand-repartition-with-config)   │   55.85ms │
│ Average Time (on-demand-not-always-add-roundrobin) │   48.88ms │
│ Queries Faster                                     │        21 │
│ Queries Slower                                     │         0 │
│ Queries with No Change                             │         1 │
└────────────────────────────────────────────────────┴───────────┘
--------------------
Benchmark tpch_sf10.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ on-demand-repartition-with-config ┃ on-demand-not-always-add-roundrobin ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │                          843.22ms │                            943.27ms │  1.12x slower │
│ QQuery 2     │                          123.24ms │                            136.28ms │  1.11x slower │
│ QQuery 3     │                          407.30ms │                            446.22ms │  1.10x slower │
│ QQuery 4     │                          198.17ms │                            231.51ms │  1.17x slower │
│ QQuery 5     │                          604.48ms │                            671.58ms │  1.11x slower │
│ QQuery 6     │                          136.38ms │                            158.40ms │  1.16x slower │
│ QQuery 7     │                          887.74ms │                            967.93ms │  1.09x slower │
│ QQuery 8     │                          628.50ms │                            698.72ms │  1.11x slower │
│ QQuery 9     │                         1009.86ms │                           1097.19ms │  1.09x slower │
│ QQuery 10    │                          570.20ms │                            559.98ms │     no change │
│ QQuery 11    │                           90.48ms │                             88.46ms │     no change │
│ QQuery 12    │                          299.50ms │                            282.05ms │ +1.06x faster │
│ QQuery 13    │                          421.75ms │                            421.93ms │     no change │
│ QQuery 14    │                          231.66ms │                            231.48ms │     no change │
│ QQuery 15    │                          384.69ms │                            412.02ms │  1.07x slower │
│ QQuery 16    │                           96.85ms │                             96.86ms │     no change │
│ QQuery 17    │                         1088.84ms │                           1088.24ms │     no change │
│ QQuery 18    │                         1874.24ms │                           1587.75ms │ +1.18x faster │
│ QQuery 19    │                          462.80ms │                            395.33ms │ +1.17x faster │
│ QQuery 20    │                          429.06ms │                            378.36ms │ +1.13x faster │
│ QQuery 21    │                         1564.34ms │                           1344.72ms │ +1.16x faster │
│ QQuery 22    │                          144.52ms │                            130.80ms │ +1.10x faster │
└──────────────┴───────────────────────────────────┴─────────────────────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary                                  ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (on-demand-repartition-with-config)     │ 12497.84ms │
│ Total Time (on-demand-not-always-add-roundrobin)   │ 12369.07ms │
│ Average Time (on-demand-repartition-with-config)   │   568.08ms │
│ Average Time (on-demand-not-always-add-roundrobin) │   562.23ms │
│ Queries Faster                                     │          6 │
│ Queries Slower                                     │         10 │
│ Queries with No Change                             │          6 │
└────────────────────────────────────────────────────┴────────────┘

ok, let's revert it then, and we'll see later how it could be improved in other ways

@Weijun-H Weijun-H force-pushed the on-demand-repartition-with-config branch from 977c2d0 to ad72077 Compare February 16, 2025 14:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate documentation Improvements or additions to documentation optimizer Optimizer rules physical-expr Physical Expressions proto Related to proto crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Alternative approaches to "fan-out" style RepartitionExec
7 participants